;; This file is part of scheme-GNUnet.
;; Copyright (C) 2021 Maxime Devos
;;
;; scheme-GNUnet is free software: you can redistribute it and/or modify it
;; under the terms of the GNU Affero General Public License as published
;; by the Free Software Foundation, either version 3 of the License,
;; or (at your option) any later version.
;;
;; scheme-GNUnet is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;; Affero General Public License for more details.
;;
;; You should have received a copy of the GNU Affero General Public License
;; along with this program.  If not, see <http://www.gnu.org/licenses/>.
;;
;; SPDX-License-Identifier: AGPL3.0-or-later

(use-modules (gnu gnunet mq-impl stream)
	     (gnu gnunet mq)
	     (gnu gnunet mq handler)
	     (gnu gnunet utils hat-let)
	     (gnu gnunet utils bv-slice)
	     (gnu gnunet config db)
	     (gnu gnunet concurrency repeated-condition)
	     (fibers conditions)
	     (fibers operations)
	     (fibers)
	     ((rnrs arithmetic bitwise) #:select (bitwise-ior))
	     (rnrs bytevectors)
	     ((rnrs io ports) #:select (open-bytevector-input-port))
	     ((rnrs base) #:select (assert))
	     (rnrs hashtables)
	     ((rnrs exceptions) #:select (guard))
	     (srfi srfi-26)
	     (srfi srfi-43)
	     (rnrs io ports)
	     (ice-9 atomic)
	     (ice-9 binary-ports)
	     (ice-9 suspendable-ports)
	     (ice-9 control)
	     (ice-9 match)
	     (ice-9 threads)
	     (tests utils))

(define (no-sender . _)
  (error "no sender!"))

(define no-handlers (message-handlers))

(define (no-error-handler . _)
  (error "no error handler!"))

(test-begin "mq-stream")

(define (check-slice-equal slice bv)
  (let^ ((!! (assert (= (slice-length slice)
			(bytevector-length bv))))
	 (! slice-copy (make-bytevector (slice-length slice)))
	 (! copy (bv-slice/read-write slice-copy))
	 (<-- () (slice-copy! slice copy))
	 (!! (bytevector=? slice-copy bv)))
	(values)))

;; Without interposition, and the verifier always
;; returns #t.
(define (simple-handler the-type handle)
  (message-handler
   (type the-type)
   ((interpose code) code)
   ((well-formed? _) #t)
   ((handle! x) (handle x))))

;; Why isn't this the default?  This stops the process from
;; exiting instead of raising an EPIPE system-error when
;; writing to a broken pipe.
(sigaction SIGPIPE SIG_IGN)

(define (handle-input*! mq input)
  (call-with-values (lambda () (handle-input! mq input))
    (lambda e
      (apply inject-error! mq e)
      (values))))

(test-assert "messages + eof are injected in-order"
  (let^ ((! input/bv #vu8(0 4 0 1 ; Message type 1, size 4
			    0 5 0 2 1 ; Message type 2, size 6
			    0 6 0 3 2 1)) ; Message type 3, size 7
	 (! input (open-bytevector-input-port input/bv))
	 (! received 0)
	 (! (make-handler type expected-received expected-bv)
	    (simple-handler
	     type
	     (lambda (slice)
	       (assert (equal? received expected-received))
	       (check-slice-equal slice expected-bv)
	       (set! received (+ 1 received)))))
	 (! handler/1 (make-handler 1 0 #vu8(0 4 0 1)))
	 (! handler/2 (make-handler 2 1 #vu8(0 5 0 2 1)))
	 (! handler/3 (make-handler 3 2 #vu8(0 6 0 3 2 1)))
	 (! handlers
	    (message-handlers handler/1 handler/2 handler/3))
	 (! (error-handler . arguments)
	    (assert (equal? received 3))
	    (assert (equal? arguments '(input:regular-end-of-file)))
	    (set! received 'end-of-file))
	 (! mq (make-message-queue handlers error-handler no-sender))
	 (<-- () (handle-input*! mq input)))
	;; TODO: should the port be closed?
	(assert (equal? received 'end-of-file))))

(test-assert "overly small message is detected (--> stop)"
  (let^ ((! input/bv #vu8(0 4 0 0 ; Message type 0, size 4
			    0 3 9 ; Overly small message, size 3, type != 0
			    0 4 0 1)) ; Message type 1, size 4
	 ;; The first message is well-formatted and should therefore
	 ;; be injected.  The second one isn't, so an appropriate error should
	 ;; injected.  Then the message stream is broken, so the third
	 ;; message shouldn't be injected.
	 (! input (open-bytevector-input-port input/bv))
	 (! received 0)
	 (! handler/0
	    (simple-handler 0
			    (lambda (slice)
			      (assert (equal? received 0))
			      (check-slice-equal slice #vu8(0 4 0 0))
			      (set! received 1))))
	 (! handlers
	    (message-handlers handler/0))
	 (! (error-handler . arguments)
	    (assert (equal? received 1))
	    ;; Whether this malformed even has a message type is dubious,
	    ;; but if it has one, it will be (* 256 9).
	    (assert (equal? arguments `(input:overly-small ,(* 256 9) 3)))
	    (set! received 'overly-small))
	 (! mq (make-message-queue handlers error-handler no-sender))
	 (<-- () (handle-input*! mq input)))
	(assert (equal? received 'overly-small))))

(test-assert "premature eof is detected (--> stop)"
  (let^ ((! input/bv #vu8(0 8 7 6 5 4))
	 (! input (open-bytevector-input-port input/bv))
	 (! received #f)
	 (! (error-handler . arguments)
	    (assert (eq? received #f))
	    (assert (equal? arguments '(input:premature-end-of-file)))
	    (set! received #t))
	 (! mq (make-message-queue no-handlers error-handler no-sender))
	 (<-- () (handle-input*! mq input)))
	(assert (equal? received #t))))

(test-equal "envelopes are written (no blocking)"
  ;; Three messages
  #vu8(0 4 0 1
	 0 4 0 2
	 0 4 0 3)
  (let^ ((! messages #(#vu8(0 4 0 1)
			   #vu8(0 4 0 2)
			   #vu8(0 4 0 3)))
	 (<-- (port get-bytevector) (open-bytevector-output-port))
	 (! mq (make-message-queue no-handlers no-error-handler
				   (lambda (_) (values))))
	 (! (insert-message index message)
	    (send-message! mq (slice/read-only (bv-slice/read-write message))))
	 (<-- ()
	      (begin
		(vector-for-each insert-message messages)
		(values)))
	 (<-- ()
	      ;; The implementation detail that 'send-round'
	      ;; is called before 'wait!' is assumed here.
	      (let/ec ec
		(handle-output! mq port ec)
		(error "unreachable"))))
	(get-bytevector)))

(define (blocking-output-port port . block-positions)
  (define (close)
    (close-port port))
  (define (write! bv index length)
    (define p (port-position port))
    (if (or (null? block-positions)
	    (< (+ p length) (car block-positions)))
	(begin (put-bytevector port bv index length) length)
	(let ((short (- (car block-positions) p)))
	  (put-bytevector port bv index short)
	  ((current-write-waiter) port/blocking)
	  (set! block-positions (cdr block-positions))
	  short)))
  (define port/blocking
    (make-custom-binary-output-port "" write! #f #f close))
  (setvbuf port/blocking 'none)
  port/blocking)

;; The ‘blocking’ is to make this test case more interesting.
;; It does not currently have any effect, but it is expected
;; that the implementation of handle-output! will be changed
;; to react to blocking, for implementing message queue
;; shutdown.

(test-equal "repeatable conditions can be used (blocking)"
  '(#vu8(0 4 0 1   0 4 0 2) . 4) ; 4: number of times writing blocks
  (let^ ((! rcvar (make-repeated-condition))
	 (! stop? (make-condition))
	 (! stopped? (make-condition))
	 (! (interrupt! mq)
	    (trigger-condition! rcvar))
	 (! escape/output (make-parameter #f))
	 (<-- (out/internal get-bytevector)
	      (open-bytevector-output-port))
	 ;; block writing a few times
	 (! out (blocking-output-port out/internal 0 1 3 7))
	 (! (wait!)
	    (perform-operation
	     (apply choice-operation
		    (prepare-await-trigger! rcvar)
		    (if (>= 8 (port-position out/internal))
			(list (wrap-operation
			       (wait-operation stop?)
			       (lambda () ((escape/output)))))
			'()))))
	 (! mq (make-message-queue no-handlers no-error-handler interrupt!))
	 (! n/blocked 0)
	 (! message/1 #vu8(0 4 0 1))
	 (! message/2 #vu8(0 4 0 2)))
	(run-fibers
	 (lambda ()
	   (spawn-fiber
	    (lambda ()
	      (let/ec ec
		(parameterize ((escape/output ec)
			       (current-write-waiter
				(lambda (port)
				  (cond ((eq? port out)
					 (set! n/blocked (+ n/blocked 1)))
					((file-port? port)
					 ;; XXX ‘Attempt to suspend fiber within
					 ;; continuaton barrier’
					 #;((@@ (fibers) wait-for-writable) port)
					 (select '() (list port) '()))))))
		  (handle-output! mq out wait!)))
	      (signal-condition! stopped?)))
	   (send-message! mq (bv-slice/read-write message/1))
	   (sleep 0.001)
	   (send-message! mq (bv-slice/read-write message/2))
	   (sleep 0.001)
	   (signal-condition! stop?)
	   (wait stopped?)
	   (cons (get-bytevector) n/blocked))
	 #:parallelism 1
	 #:hz 0)))

(define (call-with-temporary-directory proc)
  (let ((file (mkdtemp (in-vicinity (or (getenv "TMPDIR") "/tmp")
				    "test-XXXXXX"))))
    (with-exception-handler
	(lambda (e)
	  (system* "rm" "-r" file)
	  (raise-exception e))
      (lambda ()
	(call-with-values
	    (lambda () (proc file))
	  (lambda the-values
	    (system* "rm" "-r" file)
	    (apply values the-values)))))))

(define (make-config where)
  (hash->configuration
   (alist->hash-table
    `((("service" . "UNIXPATH") . ,where)))))

(define (call-with-socket-location proc)
  (call-with-temporary-directory
   (lambda (dir)
     (define where (in-vicinity dir "sock.et"))
     (define config  (make-config where))
     (proc where config))))

(define (connect/test config connected?)
  (define (error-handler . error)
    (match error
      ;; The connection is closed by 'test-connection'.
      ;; If 'test-connection' doesn't close the connection,
      ;; then the GC would.  In both cases, this error would
      ;; happen.
      (('input:regular-end-of-file) (values))
      (('connection:connected) (signal-condition! connected?))))
  (connect/fibers config "service" no-handlers error-handler
		  #:spawn call-with-new-thread))

(define (alist->hash-table alist)
  (define h (make-hashtable (lambda (key) 0) equal?))
  (define (insert! key+value)
    (hashtable-set! h (car key+value) (cdr key+value)))
  (for-each insert! alist)
  h)

(define (test-connection mq server-sock)
  (send-message! mq (bv-slice/read-write #vu8(0 4 0 0)))
  (let ((client (car (accept server-sock))))
    (assert (equal? #vu8(0 4 0 0) (get-bytevector-n client 4)))
    (close-port client)
    #t))

(define (yield-many)
  ;; Give the new threads some time to run before binding the socket.
  ;; This allowed a bug in the use of 'connect' to be detected.
  (let loop ((n (* 8 (+ 1 (length (all-threads))))))
    (when (> n 0)
      (yield)
      (loop (- n 1)))))

(test-assert "connect-unix, can connect when socket is already listening"
  (call-with-socket-location
   (lambda (where config)
     (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
     (define connected? (make-condition))
     (bind listening-sock AF_UNIX where)
     (listen listening-sock 1)
     (define mq (connect/test config connected?))
     (wait connected?)
     (test-connection mq listening-sock))))

;; Consider the case where a service starts, has bound its socket
;; but is not yet listening, and a client connects.
(test-assert "connect-unix, will connect when socket is listening"
  (call-with-socket-location
   (lambda (where config)
     (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
     (define connected? (make-condition))
     (bind listening-sock AF_UNIX where)
     (define mq (connect/test config connected?))
     (yield-many)
     (listen listening-sock 1)
     (wait connected?)
     (test-connection mq listening-sock))))

;; Consider the case where a client starts before a service.
(test-assert "connect-unix, will connect when socket is bound (and listening)"
  (call-with-socket-location
   (lambda (where config)
     (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
     (define connected? (make-condition))
     (define mq (connect/test config connected?))
     (yield-many)
     (bind listening-sock AF_UNIX where)
     (listen listening-sock 1)
     (wait connected?)
     (test-connection mq listening-sock))))

;; Consider the case where a service starts and stops,
;; a client connects and the service restarts.
(test-assert
    "connect-unix, will connect even if there's an old socket lying around"
  (call-with-socket-location
   (lambda (where config)
     (let ((old-sock (socket PF_UNIX SOCK_STREAM 0)))
       (bind old-sock AF_UNIX where)
       (close-port old-sock))
     (define connected? (make-condition))
     (define mq (connect/test config connected?))
     (yield-many)
     (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
     (yield-many)
     ;; Delete the old socket, otherwise the 'bind' below results in ‘address alreay in use’
     (delete-file where)
     (yield-many)
     (bind listening-sock AF_UNIX where)
     (yield-many)
     (listen listening-sock 1)
     (wait connected?)
     (test-connection mq listening-sock))))

;; Consider the case where GNUnet version N uses stream sockets,
;; GNUnet version M uses datagram sockets, the system initially
;; uses GNUnet version N, a client for version M is started
;; (initially failing to connect to the server), then the system
;; switches to GNUnet version M.
(test-assert
    "connect-unix, will connect even if previous socket is different type"
  (call-with-socket-location
   (lambda (where config)
     (define old-sock (socket PF_UNIX SOCK_DGRAM 0))
     (bind old-sock AF_UNIX where)
     ;; Datagram sockets don't support 'listen', so don't
     ;; call 'listen' with 'old-sock'.
     (define connected? (make-condition))
     (define mq (connect/test config connected?))
     (yield-many)
     (close-port old-sock)
     (delete-file where)
     (define new-sock (socket PF_UNIX SOCK_STREAM 0))
     (bind new-sock AF_UNIX where)
     (listen new-sock 1)
     (wait connected?)
     (test-connection mq new-sock))))

;; Consider a system that creates directories and the socket
;; with world-unreadable, world-unexecutable permissions at
;; first and makes the permissions more permissive later.
(test-assert
    "connect-unix, will connect even if permissions are temporarily wrong"
  (call-with-temporary-directory
   (lambda (tmpdir)
     ;; Permissions on sockets can be unreliable on some systems,
     ;; so modify the permissions of a directory instead.
     (define subdir (in-vicinity tmpdir "dir"))
     (mkdir subdir)
     (define where (in-vicinity subdir "sock.et"))
     (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
     (bind listening-sock AF_UNIX where)
     (listen listening-sock 1)
     (chmod subdir #o000) ; unreadable
     (define connected? (make-condition))
     (define mq (connect/test (make-config where) connected?))
     (yield-many)
     ;; make it readable again
     ;; (and writable such that 'tmpdir' can be deleted).
     (chmod subdir #o700)
     (wait connected?)
     (test-connection mq listening-sock))))

(test-assert "port->message-queue, can send/receive between pairs"
  (run-fibers
   (lambda ()
     ;; Create two message queues connected to each other
     ;; over a socket pair.  Send '1' over the first message queue
     ;; and expect to receive it from the second queue, and send '0'
     ;; over the second message queue and expect to receive it from
     ;; the first.
     (define pair (socketpair AF_UNIX SOCK_STREAM 0))
     ;; As 'fibers' is used instead of POSIX threads, set O_NONBLOCK.
     (make-nonblocking! (car pair))
     (make-nonblocking! (cdr pair))
     (define received/0 #f)
     (define received/1 #f)
     (define done/0 (make-condition))
     (define done/1 (make-condition))
     (define handlers/0
       (message-handlers
	(simple-handler 0
			(lambda (slice)
			  (assert (not received/0))
			  (check-slice-equal slice #vu8(0 4 0 0))
			  (set! received/0 #t)
			  (signal-condition! done/0)))))
     (define handlers/1
       (message-handlers
	(simple-handler 1
			(lambda (slice)
			  (assert (not received/1))
			  (check-slice-equal slice #vu8(0 4 0 1))
			  (set! received/1 #t)
			  (signal-condition! done/1)))))
     (define error-handler no-error-handler)
     (define mq/0 (port->message-queue (car pair) handlers/0 error-handler))
     (define mq/1 (port->message-queue (cdr pair) handlers/1 error-handler))
     (send-message! mq/0 (bv-slice/read-write #vu8(0 4 0 1)))
     (send-message! mq/1 (bv-slice/read-write #vu8(0 4 0 0)))
     (wait done/0)
     (wait done/1)
     #t)))

(define (two-sockets)
  (define sp (socketpair AF_UNIX SOCK_STREAM 0))
  (make-nonblocking! (car sp))
  (make-nonblocking! (cdr sp))
  (values (car sp) (cdr sp)))

(test-assert "input eof detected --> handle-input/output! stops (port->message-queue)"
  (call-with-spawner/wait
   (lambda (spawn)
     (define-values (alpha beta) (two-sockets))
     (define end-of-file (make-condition))
     (define (error-handler . e)
       (assert (equal? e '(input:regular-end-of-file)))
       ;; only one end-of-file notification
       (assert (signal-condition! end-of-file)))
     (define mq/alpha
       (port->message-queue alpha no-handlers error-handler
			    #:spawn spawn))
     ;; Give the fibers started by 'port->message-queue' a chance to block.
     (yield-many)
     ;; Let 'beta' stop writing, such that 'alpha' receives an end-of-file.
     ;; But keep the 'write' end of 'alpha' / 'read' end of 'beta' open to
     ;; complicate matters.
     (shutdown beta 1)
     (wait end-of-file)
     (define sent? (make-atomic-box #f))
     ;; Attempt to write a message, even though the connection is (half-)closed.
     ;; It should not actually be sent.
     (send-message! mq/alpha (bv-slice/read-write #vu8(0 4 0 0))
		    #:notify-sent!
		    (lambda ()
		      ;; strictly speaking, this does not mean the message was
		      ;; sent, but it's close enough for this test's purposes.
		      (atomic-box-set! sent? #t)))
     ;; Give 'handle-output!' a chance to (faultively) sent the message.
     (yield-many)
     (sleep 0.1) ; the yield-many above is apparently insufficient
     (assert (not (atomic-box-ref sent?)))
     ;; If it didn't try to sent the message, that presumably means the
     ;; 'handle-output!' fiber has completed.
     #t)
   ;; Should make 'yield-many' less fragile.
   #:parallelism 1))

(define (%false-if-broken-pipe thunk)
  "Call @var{thunk} in an environment where EPIPE system errors are caught.
If an EPIPE system error is raised, return #f."
  (guard (c ((and (eq? 'system-error (exception-kind c))
		  (= EPIPE (car (list-ref (exception-args c) 3))))
	     #f))
    (thunk)))

(define-syntax-rule (false-if-broken-pipe exp exp* ...)
  ;; See %false-if-broken-pipe
  (%false-if-broken-pipe
   (lambda ()
     exp exp* ...)))

(test-assert "closed for writing --> handle-input! stops (port->message-queue)"
  (call-with-spawner/wait
   (lambda (spawn)
     (define-values (alpha beta) (two-sockets))
     (define received? (make-atomic-box #f))
     (define end-of-file (make-condition))
     (define (receive! slice)
       (assert (not (atomic-box-ref received?)))
       (atomic-box-set! received? #t)
       (error "shouldn't be received"))
     (define (error-handler . e)
       (pk 'e e)
       (assert (equal? e '(input:regular-end-of-file)))
       ;; only one end-of-file notification
       (assert (signal-condition! end-of-file)))
     (define mq/alpha
       (port->message-queue alpha
			    (message-handlers
			     (simple-handler 0 receive!))
			    error-handler
			    #:spawn spawn))
     ;; Give the new fibers a chance to block.
     (yield-many)
     ;; Let 'beta' stop reading, such that 'alpha' is closed for writing.
     ;; But keep the 'read' end of 'alpha' open to complicate matters.
     (shutdown beta 0)
     ;; TODO: fibers doesn't have an option for waiting for EPOLLRDHUP
     ;; or EPOLLERR, so the code cannot immediately detect that 'alpha'
     ;; cannot be written to anymore.  Instead, 'handle-output!' will
     ;; detect the unwritability when it tries to write something.
     (send-message! mq/alpha (bv-slice/read-write #vu8(0 4 0 9)))
     ;; The end-of-file error should be injected, even though the socket
     ;; is half-duplex and only the write end is closed, because message
     ;; queues do not have a notion of half-duplex connections.
     (pk 'waiting)
     (wait end-of-file)
     ;; Attempt to read a message (after buffering a message), even though
     ;; the connection is half-closed.  Ignore broken pipe errors here:
     ;; if a ‘broken pipe’ error happens here, that means ALPHA was closed,
     ;; which is correct (tested in "port is closed at output").
     (false-if-broken-pipe (put-bytevector beta #vu8(0 4 0 0)))
     ;; As the 'handle-input!' fiber should have exited already, 'receive!'
     ;; shouldn't be called.
     (yield-many)
     (sleep 0.1) ; might not be necessary anymore
     #t)
   ;; Should make 'yield-many' less fragile.
   #:parallelism 1))

;; This detects the absence of the parametrisation of 'current-write-waiter'.
(test-assert "writer blocking and closed for reading --> all fibers stop"
  (call-with-spawner/wait
   (lambda (spawn)
     (define-values (alpha beta) (two-sockets))
     ;; Fill the writing pipe, such that the writing fiber will block.
     #;(fcntl alpha SO_SNDBUF 1) ; doesn't work on sockets on Linux ..
     ;; Simply writing a byte isn't sufficient, as the kernel can
     ;; impose a minimum buffer size.
     (define old-waiter (current-write-waiter))
     (let/ec ec
       (parameterize ((current-write-waiter
		       (lambda (port)
			 (if (eq? port alpha)
			     (ec)
			     ;; maybe a backtrace
			     (old-waiter port)))))
	 (define bv (make-bytevector 4096))
	 (let loop ()
	   (put-bytevector alpha bv)
	   (loop))))
     (define closed-condition (make-condition))
     (define (error-handler e)
       (assert (eq? e 'input:regular-end-of-file))
       (unless (signal-condition! closed-condition)
	 (error "already saw end of file")))
     (define mq (port->message-queue alpha no-handlers error-handler
				     #:spawn spawn))
     (send-message! mq (bv-slice/read-write #vu8(0 4 0 0)))
     (define (notify-sent)
       ;; if the mq-stream implementation implemented buffering by itself
       ;; this would actually be possible.
       (error "impossible, should be blocking by now!"))
     (send-message! mq
		    (bv-slice/read-write #vu8(0 4 0 0))
		    #:notify-sent! notify-sent)
     (pk 'send2)
     ;; Give the write fiber a chance to block.
     (yield-many)
     (sleep 0.1)
     (shutdown alpha 0)
     #t)
   ;; Should make 'yield-many' less fragile.
   #:parallelism 1))
;; ^ if this test blocks, that means not all fibers have stopped

(test-assert "output buffers are flushed"
  (let^ ((<-- (alpha beta) (two-sockets))
         ;; In Guile, by default, new sockets are unbuffered.
         ;; Add a buffer.
         (<-- _ (setvbuf alpha 'block 64))
         (<-- _ (setvbuf beta 'block 64))
         (! mq (make-message-queue no-handlers no-error-handler
                                   (lambda (_) (values))))
         ;; Send a message.  As the message is smaller than the buffer size,
         ;; it will be buffered unless 'handle-output!' takes special action.
         (! _ (send-message! mq (slice/read-only (bv-slice/read-write #vu8(0 4 0 0)))))
         (! waited?
            (let/ec ec
              (let ((old-waiter (current-write-waiter)))
                (parameterize ((current-write-waiter
                                (lambda (p)
                                  (if (eq? p alpha)
                                      (ec #t)
                                      (old-waiter p)))))
                  (handle-output! mq alpha
                                  (lambda ()
                                    (pk 'waiting...)
                                    ((pk 'escaping ec) #f)
                                    (pk 'escaped!)))))))
         ;; If HANDLE-OUTPUT! blocked, that meant the underlying system call
         ;; was called, so the kernel got (some of the) data and all is well
         ;; -- except that the kernel buffer size of 4 bytes seems rather tiny.
         (? waited?
            (format (current-error-port) "≤4 bytes seems rather small~%")
            #t)
	 (! old-read-waiter (current-read-waiter)) )
        ;; If waited? is false, that means HANDLE-OUTPUT! succeeded and now
        ;; the bytes are in Guile's or the kernel's buffers.  Test if they
        ;; are in the kernel's.
	(let/ec ec
	  (equal? #vu8(0 4 0 0)
		  (parameterize ((current-read-waiter
				  (lambda (p)
				    (if (eq? p beta)
					(ec #f)
					(old-read-waiter p)))))
		    (get-bytevector-some beta))))))

(define (error-handler/regular . e)
  (match e
    ('(input:regular-end-of-file) (values))
    (_ (error "what ~a" e))))

(test-assert "port is closed at input eof"
  (call-with-spawner/wait
   (lambda (spawn)
     (define-values (alpha beta) (two-sockets))
     (define q (port->message-queue alpha no-handlers error-handler/regular
				    #:spawn spawn))
     (shutdown alpha 0)
     (yield-many)
     (sleep 0.05) ;; XXX yield-many above is unsufficient
     (port-closed? alpha))
   #:parallelism 1)) ; to make the use of yield-many less fragile

(test-assert "port is closed at output eof"
  (call-with-spawner/wait
   (lambda (spawn)
     (define-values (alpha beta) (two-sockets))
     (define mq (port->message-queue alpha no-handlers error-handler/regular
				     #:spawn spawn))
     (shutdown alpha 1)
     ;; XXX It's not possible for the output eof to be waited for currently,
     ;; so attempt to send a message to wake up the writing fiber.
     (send-message! mq (bv-slice/read-write #vu8(0 4 0 0)))
     (yield-many)
     (sleep 0.05) ;; XXX yield-many above is unsufficient
     (port-closed? alpha))
   #:parallelism 1)) ; to make the use of yield-many less fragile

(test-assert "port is closed at input/output eof"
  (call-with-spawner/wait
   (lambda (spawn)
     (define-values (alpha beta) (two-sockets))
     (define q (port->message-queue alpha no-handlers error-handler/regular
				    #:spawn spawn))
     (shutdown alpha 2)
     (yield-many)
     (sleep 0.05) ;; XXX yield-many above is unsufficient
     (port-closed? alpha))
   #:parallelism 1)) ; to make the use of yield-many less fragile

(test-assert "fibers stop and port closed after close! (directly after creation)"
  (let^ ((<-- (alpha beta) (two-sockets)))
	(call-with-spawner/wait
	 (lambda (spawn)
	   (define q (port->message-queue alpha no-handlers error-handler/regular
					  #:spawn spawn))
	   (close-queue! q))
	 #:parallelism 1)
	(port-closed? alpha)))

(test-assert "fibers stop and port closed after close! (some times passes)"
  (let^ ((<-- (alpha beta) (two-sockets)))
	(call-with-spawner/wait
	 (lambda (spawn)
	   (define q (port->message-queue alpha no-handlers error-handler/regular
					  #:spawn spawn))
	   (yield-many)
	   (sleep 0.01)
	   (close-queue! q))
	 #:parallelism 1)
	(port-closed? alpha)))

(test-assert "can close while still connecting (--> interrupted)"
  (call-with-socket-location
   (lambda (where config)
     (call-with-spawner/wait
      (lambda (spawn)
	(define interrupted? #f)
	(define cond (make-condition))
	(define (error-handler . e)
	  (match e
	    ('(connection:interrupted)
	     (begin
	       (pk 'interrupted)
	       (assert (not interrupted?))
	       (set! interrupted? #t)
	       (signal-condition! cond)))
	    (_ (error "what ~a" e))))
	(define mq (connect/fibers config "service" no-handlers error-handler
				   #:spawn spawn))
	(close-queue! mq)
	(wait cond)
	#t)))))

(test-assert "can close after being connected (--> regular-end-of-file)"
  (call-with-socket-location
   (lambda (where config)
     (call-with-spawner/wait
      (lambda (spawn)
	(define connected? #f)
	(define connected-condition (make-condition))
	(define disconnected? #f)
	(define disconnected-condition (make-condition))
	(define (error-handler . e)
	  (match e
	    ('(connection:connected)
	     (pk 'connected)
	     (assert (not connected?))
	     (set! connected? #t)
	     (signal-condition! connected-condition))
	    ('(input:regular-end-of-file)
	     (assert connected?)
	     (assert (not disconnected?))
	     (set! disconnected? #t)
	     (signal-condition! disconnected-condition))
	    (_ (error "what ~a" e))))
	(define mq (connect/fibers config "service" no-handlers error-handler
				   #:spawn spawn))
	(spawn
	 (lambda ()
	   (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
	   (bind listening-sock AF_UNIX where)
	   (listen listening-sock 1)
	   ;; Make it non-blocking (because guile-fibers is used)
	   (make-nonblocking! listening-sock)
	   ;; Not actually interested in the return value
	   (accept listening-sock)))
	(wait connected-condition)
	(assert (not disconnected?))
	(close-queue! mq)
	(wait disconnected-condition)
	#t)))))

(test-end "mq-stream")
